Stream from AWS Kinesis

In this guide, you'll learn how to send data from AWS Kinesis to Tinybird.

If you have a Kinesis Data Stream that you want to send to Tinybird, it should be pretty quick thanks to Kinesis Firehose. This page explains how to integrate Kinesis with Tinybird using Firehose.

1. Push messages From Kinesis To Tinybird

Create a Token with the right scope

In your Workspace, create a Token with the Create new Data Sources or append data to existing ones scope:

Create a Token with the right scope

Create a new Data Stream

Start by creating a new Data Stream in AWS Kinesis (see the AWS documentation for more information).

Create a Kinesis Data Stream

Create a Firehose Delivery Stream

Next, create a Kinesis Data Firehose Delivery Stream.

Set the Source to Amazon Kinesis Data Streams and the Destination to HTTP Endpoint.

In the Destination Settings, set HTTP Endpoint URL to point to the Tinybird Events API.

https://api.tinybird.co/v0/events?name=<your_datasource_name>&wait=true&token=<your_token_with_DS_rights>

This example is for Workspaces in the GCP --> europe-west3 region. If necessary, replace with the correct region for your Workspace. Additionally, note the wait=true parameter. Learn more about it in the Events API docs.

You don't need to create the Data Source in advance; it will automatically be created for you.

Send sample messages and check that they arrive to Tinybird

If you don't have an active data stream, follow this python script to generate dummy data.

Back in Tinybird, you should see 3 columns filled with data in your Data Source. timestamp and requestId are self explanatory, and your messages are in records\_\data:

Firehose Data Source

2. Decode message data

Decode message data

The records\_\data column contains an array of encoded messages.

In order to get one row per each element of the array, use the ARRAY JOIN Clause. You'll also need to decode the messages with the base64Decode() function.

Now that the raw JSON is in a column, you can use JSONExtract functions to extract the desired fields:

Decoding messages
NODE decode_messages
SQL >
   SELECT
       base64Decode(encoded_m) message,
       fromUnixTimestamp64Milli(timestamp) kinesis_ts
   FROM firehose
   ARRAY JOIN records__data as encoded_m
 
NODE extract_message_fields
SQL >
   SELECT
       kinesis_ts,
       toDateTime64(JSONExtractString(message, 'datetime'), 3) datetime,
       JSONExtractString(message, 'event') event,
       JSONExtractString(message, 'product') product
   FROM decode_messages
Decoding messages

When configuring AWS Kinesis as a Data Source, use the following settings:

  • Set wait=true when calling the Events API. See the Events API docs for more information.
  • Set the buffer size lower than 10 Mb in Kinesis.
  • Set 128 shards as the maximum in Kinesis.

Performance optimizations

It is highly recommended to persist the decoded and unrolled result in a different Data Source. You can do it with a Materialized View: A combination of a Pipe and a Data Source that leaves the transformed data into the destination Data Source as soon as new data arrives to the Firehose Data Source.

Don't store what you won't need. In this example, some of the extra columns could be skipped. Add a TTL to the Firehose Data Source to prevent keeping more data than you need.

Another alternative is to create the Firehose Data Source with a Null Engine. This way, data ingested there can be transformed and fill the destination Data Source without being persisted in the Data Source with the Null Engine.

Next steps

Updated